-
Notifications
You must be signed in to change notification settings - Fork 430
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[RayJob] implement deletion policy API #2643
base: master
Are you sure you want to change the base?
Conversation
08bdbfb
to
b2d43be
Compare
b2d43be
to
472b4c2
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you resolve the conflicts? Also, should we add some tests for this feature?
472b4c2
to
0db79c3
Compare
Fixed conflicts, will add tests tomorrow |
30adbd6
to
33747ec
Compare
Added unit tests, going to skip e2e tests for now since it's currently not trivial to enable feature gates in the e2e tests |
33747ec
to
f08fe12
Compare
f08fe12
to
fed8484
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
fed8484
to
5637ea0
Compare
rayJobInstance.Spec.DeletionPolicy != nil && | ||
*rayJobInstance.Spec.DeletionPolicy != rayv1.DeleteNoneDeletionPolicy && | ||
len(rayJobInstance.Spec.ClusterSelector) == 0 { | ||
logger.Info( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Move
logger.Info(
"RayJob deployment status",
"jobDeploymentStatus", rayJobInstance.Status.JobDeploymentStatus,
"deletionPolicy", rayJobInstance.Spec.DeletionPolicy,
"ttlSecondsAfterFinished", ttlSeconds,
"Status.endTime", rayJobInstance.Status.EndTime,
"Now", nowTime,
"ShutdownTime", shutdownTime)
if shutdownTime.After(nowTime) {
delta := int32(time.Until(shutdownTime.Add(2 * time.Second)).Seconds())
logger.Info("shutdownTime not reached, requeue this RayJob for n seconds", "seconds", delta)
return ctrl.Result{RequeueAfter: time.Duration(delta) * time.Second}, nil
}
to the above of if features.Enabled(features.RayJobDeletionPolicy) &&
and remove the similar logics from L391 to L403.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The conditions are split here because the keys passed into logger.Info
are different (shutdownAfterJobFinishes vs deletePolicy)
@@ -617,6 +655,31 @@ func (r *RayJobReconciler) deleteClusterResources(ctx context.Context, rayJobIns | |||
return isClusterDeleted, nil | |||
} | |||
|
|||
func (r *RayJobReconciler) scaleWorkerReplicasToZero(ctx context.Context, rayJobInstance *rayv1.RayJob) (bool, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we need to return bool
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no, this is an oversight
@@ -617,6 +655,31 @@ func (r *RayJobReconciler) deleteClusterResources(ctx context.Context, rayJobIns | |||
return isClusterDeleted, nil | |||
} | |||
|
|||
func (r *RayJobReconciler) scaleWorkerReplicasToZero(ctx context.Context, rayJobInstance *rayv1.RayJob) (bool, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This function may not work when autoscaling is enabled. If autoscaling is enabled, Pod deletion is always determined by the Ray Autoscaler. KubeRay will not delete any Pods, even if the number of Pods exceeds the goal state.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see your point, but autoscaling with RayJob is pretty uncommon though right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One way to fix this is to also set max replicas to 0
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
but autoscaling with RayJob is pretty uncommon though right?
I checked with my colleagues, and this may be incorrect. Autoscaling is not very common for Ray Train. However, it is commonly used for Ray Data, Ray Tune, and RLlib.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Most Ray Data users use autoscaling.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't mean the Ray API, I mean autoscaling is not common when using the RayJob custom resource. I am sure Ray Data with RayCluster + autoscaling is very common
@@ -617,6 +655,31 @@ func (r *RayJobReconciler) deleteClusterResources(ctx context.Context, rayJobIns | |||
return isClusterDeleted, nil | |||
} | |||
|
|||
func (r *RayJobReconciler) scaleWorkerReplicasToZero(ctx context.Context, rayJobInstance *rayv1.RayJob) (bool, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Typically, a K8s controller should only write to the CR status and treat the CR spec as read-only, but implementing this feature without writing to the CR spec is challenging for us.
Perhaps a compromise solution is to add a new field to the RayCluster CRD (e.g., suspendWorkers: bool
), where the RayJob controller only sets this field to true, and the RayCluster is responsible for deleting all Ray worker Pods.
This way, the RayJob controller doesn't need to modify replicas and minReplicas, which can also be modified by the Ray Autoscaler or users. Allowing multiple stakeholders to modify a field is typically the root cause of KubeRay's instability issues.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Controllers writing to spec is not necessarily bad, but I see what you mean. I think it would be wrong to write to RayJob spec from RayJob controller, but in this case we're writing to RayCluster spec from RayJob controller. I feel that updating replcias, minReplicas and maxReplicas for ephemeral RayCluster specifcally is actually fine because we don't care about the RayCluster spec once the cluster is deleted.
Will think about this more and get back to you.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The main concern is that multiple personas can modify these fields, such as users, the Autoscaler, and the RayJob controller.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps a compromise solution is to add a new field to the RayCluster CRD (e.g., suspendWorkers: bool), where the RayJob controller only sets this field to true, and the RayCluster is responsible for deleting all Ray worker Pods.
@kevin85421 how about a suspend
field per worker group in WorkerGroupSpec? This allows for granularity of suspension per worker, and from RayJob we can just set suspend: true
for all worker groups
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here's the draft PR #2663
Let me know what you think, I will clean up the PR and add tests if the API looks good to you
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The API looks good to me.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we can split this issue into 3 PRs if the comment makes sense to you?
- Add a new field and feature flag in RayJob.
- Add a new field in RayCluster CRD to terminate all worker Pods.
- Implement the deletion policy API based on (2)
5637ea0
to
ff2deca
Compare
cb3980c
to
dbf7cef
Compare
Updated to use the new worker group suspend API |
dbf7cef
to
54d0ab6
Compare
Signed-off-by: Andrew Sy Kim <[email protected]>
54d0ab6
to
4c3a4a6
Compare
I will review the PR tomorrow. |
@@ -95,6 +104,12 @@ type RayJobSpec struct { | |||
// +kubebuilder:validation:XValidation:rule="self == oldSelf",message="the managedBy field is immutable" | |||
// +kubebuilder:validation:XValidation:rule="self in ['ray.io/kuberay-operator', 'kueue.x-k8s.io/multikueue']",message="the managedBy field value must be either 'ray.io/kuberay-operator' or 'kueue.x-k8s.io/multikueue'" | |||
ManagedBy *string `json:"managedBy,omitempty"` | |||
// deletionPolicy indicates what resources of the RayJob are deleted upon job completion. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit
// deletionPolicy indicates what resources of the RayJob are deleted upon job completion. | |
// DeletionPolicy indicates what resources of the RayJob are deleted upon job completion. |
return err | ||
} | ||
|
||
logger.Info("All worker groups for RayCluster has been scaled to 0", "RayCluster", clusterIdentifier) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
logger.Info("All worker groups for RayCluster has been scaled to 0", "RayCluster", clusterIdentifier) | |
logger.Info("All worker groups for RayCluster have had `suspend` set to true", "RayCluster", clusterIdentifier) |
} | ||
|
||
logger.Info("All worker groups for RayCluster has been scaled to 0", "RayCluster", clusterIdentifier) | ||
r.Recorder.Eventf(rayJobInstance, corev1.EventTypeNormal, string(utils.UpdatedRayCluster), "Updated cluster %s/%s", cluster.Namespace, cluster.Name) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
r.Recorder.Eventf(rayJobInstance, corev1.EventTypeNormal, string(utils.UpdatedRayCluster), "Updated cluster %s/%s", cluster.Namespace, cluster.Name) | |
r.Recorder.Eventf(rayJobInstance, corev1.EventTypeNormal, string(utils.UpdatedRayCluster), "Set the `suspend` field to true for all worker groups in cluster %s/%s", cluster.Namespace, cluster.Name) |
ttlSeconds := rayJobInstance.Spec.TTLSecondsAfterFinished | ||
nowTime := time.Now() | ||
shutdownTime := rayJobInstance.Status.EndTime.Add(time.Duration(ttlSeconds) * time.Second) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe move the check before if features.Enabled(features.RayJobDeletionPolicy) ...
to simplify the code?
if shutdownTime.After(nowTime) {
delta := int32(time.Until(shutdownTime.Add(2 * time.Second)).Seconds())
logger.Info("shutdownTime not reached; requeue this RayJob.", "requeue seconds", delta)
return ctrl.Result{RequeueAfter: time.Duration(delta) * time.Second}, nil
}
@@ -347,10 +348,46 @@ func (r *RayJobReconciler) Reconcile(ctx context.Context, request ctrl.Request) | |||
case rayv1.JobDeploymentStatusComplete, rayv1.JobDeploymentStatusFailed: | |||
// If this RayJob uses an existing RayCluster (i.e., ClusterSelector is set), we should not delete the RayCluster. | |||
logger.Info(string(rayJobInstance.Status.JobDeploymentStatus), "RayJob", rayJobInstance.Name, "ShutdownAfterJobFinishes", rayJobInstance.Spec.ShutdownAfterJobFinishes, "ClusterSelector", rayJobInstance.Spec.ClusterSelector) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Move the log message below and add the following 4 keys:
ttlSecondsAfterFinished
Status.endTime
Now
ShutdownTime
Then, remove these 4 keys from the following log messages.
DeleteClusterDeletionPolicy DeletionPolicy = "DeleteCluster" | ||
DeleteWorkersDeletionPolicy DeletionPolicy = "DeleteWorkers" | ||
DeleteSelfDeletionPolicy DeletionPolicy = "DeleteSelf" | ||
DeleteNoneDeletionPolicy DeletionPolicy = "DeleteNone" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In some cases, users may want to have different delete policies for Ray jobs that succeeded and Ray jobs that failed. For example, DeleteCluster
if job succeeded and DeleteWorkers
if job failed.
Do we plan to support these cases? We don't need to include these policies in this PR, but I am wondering what the APIs would look like.
type DeletionPolicy string | ||
|
||
const ( | ||
DeleteClusterDeletionPolicy DeletionPolicy = "DeleteCluster" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would you mind adding some comments to explain these policies?
Why are these changes needed?
Implement RayJob DeletionPolicy API
Related issue number
#2615
Checks